-
Notifications
You must be signed in to change notification settings - Fork 247
refactor(block/internal/syncing): Catch-up, then launch tip-tracking routines #2802
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
tests are failing |
|
The latest Buf updates on your PR. Results from workflow CI and Release / buf-check (pull_request).
|
|
Claude finished @renaynay's task —— View job Code Review CompleteI've completed a comprehensive review of this synchronization refactor. The tests are still running, but I've identified several issues that need attention. Critical Issues1. Interface Mismatch - SyncHead Method SignatureLocation: The SyncHead() (H, error)But the actual implementation has: func (syncService *SyncService[H]) SyncHead(ctx context.Context) (H, error)Impact: This will cause compilation failures. The syncer calls Fix Required: Either:
High Priority Issues2. Missing Ticker in waitForNewP2PHeightsLocation: The ticker is created at line 414 but never used in the select statement: ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
default: // ← Problem: no ticker case
}
// ... processing logic
}Impact: This creates a tight busy loop that will consume 100% CPU. The code will continuously process without any throttling. Fix Required: for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
// Move processing logic here
}
}3. Potential Goroutine Leak in syncLoopLocation: The code creates a wg := sync.WaitGroup{} // ← This is sync.WaitGroup, not errgroup.Group
// ...
wg.Go(func() { // ← sync.WaitGroup doesn't have Go() methodImpact: Compilation error. You need Fix Required: wg := errgroup.Group{}4. Missing Error Handling in syncDARangeLocation: The Fix Required: Return errors from Concurrency & Race Conditions5. Race Condition in daRetrievalLoopLocation: The loop has both for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.tryFetchFromDA(nextDARequestAt)
s.processPendingEvents()
default: // ← This creates a busy loop
}
}Impact: The 6. Atomic Operations Without SynchronizationLocation: Throughout The code uses atomic operations for // syncer.go:277-282
if s.headerStore.Store().Height() < startHeight {
wg.Go(func() {
s.p2pHandler.ProcessHeaderRange(s.ctx, s.headerStore.Store().Height(), startHeight+1, s.heightInCh)
})
}Concern: The Architecture & Design7. Blocking Catch-up May TimeoutLocation: ctx, cancel := context.WithTimeout(s.ctx, time.Second) // TODO @tac0turtle: you can adjust timeouts to realistic
startHeight, err := s.store.Height(ctx)
cancel()Concern: The 1-second timeout during catch-up initialization seems too short. If the store operation is slow (e.g., cold start, large DB), this will fail unnecessarily. Recommendation: Either increase the timeout or use the parent context without timeout during initialization. 8. Context Propagation ChangeLocation: The PR description mentions "pass syncer's context in there instead of artificially lowering timeout on ctx". This is good, but there are still artificial timeouts in timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*2)
retrievedData, err := h.dataStore.GetByHeight(timeoutCtx, height)Concern: This is inconsistent with the stated goal. If the header store implicitly waits, why does the data store need a timeout? Recommendation: Document why data needs a timeout but headers don't, or make them consistent. Error Handling9. Silent Failure in waitForNewP2PHeightsLocation: When syncHeadHeader, err := s.headerStore.SyncHead()
if err != nil {
s.logger.Error().Err(err).Msg("failed to get header p2p sync head")
continue
}Concern: With the busy loop issue (#2), this will spam logs continuously if Recommendation: Add backoff or error counting to avoid log spam. 10. Context Cancellation Not CheckedLocation:
s.p2pHandler.ProcessHeaderRange(s.ctx, s.headerStore.Store().Height()+1, syncHeadHeader.Height(), s.heightInCh)If Testing & Reliability11. Test Coverage for New FlowThe new two-phase approach (catch-up → tip-tracking) is a significant architectural change. Ensure tests cover:
12. Missing Integration TestThe interaction between
Performance13. Channel Buffer SizeLocation: heightInCh: make(chan common.DAHeightEvent, 10_000),Observation: 10,000 buffer size is quite large. Consider if this is necessary or if backpressure should be applied earlier. 14. Pending Events ProcessingLocation:
Concern: If both tip-tracking routines call this simultaneously, there could be contention on the cache. Consider if this needs synchronization or if the cache is already thread-safe. Code Quality15. TODOs in Production CodeLocations: Multiple // syncer.go:263
ctx, cancel := context.WithTimeout(s.ctx, time.Second) // TODO @tac0turtle: you can adjust timeouts to realistic
// syncer.go:342
// TODO @tac0turtle, changed it to fire on Celestia blocktime
// syncer.go:413
// TODO @tac0turtle: ev-node expected blocktime here
// p2p_handler.go:58, 81, 144
// TODO @tac0turtle: it is desirable to block here...Recommendation: Either resolve these TODOs before merging or create tracking issues. 16. Magic Numbers
Security17. No Rate Limiting on DA RequestsWhile there's backoff on errors, there's no protection against rapid DA queries during normal operation. If Celestia block time is 6s but events arrive faster, the node might overload the DA layer. Recommendation: Consider adding rate limiting or a minimum time between requests. Documentation18. Missing Function DocumentationKey functions lack documentation:
Positive Observations✅ Good separation of concerns - The P2P handler is cleanly separated SummaryBlockers (must fix):
High Priority: Recommendations:
The architectural direction is sound, but the implementation needs fixes before merging. Once the compilation issues are resolved, I recommend running the full test suite and benchmarks to validate performance characteristics. |
| type Broadcaster[H header.Header[H]] interface { | ||
| WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error | ||
| Store() header.Store[H] | ||
| SyncHead() (H, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will tell you what the latest view of network head is BTW (it's the same as listening from gossipsub).
|
the main takeaway from this pr is that we should have sync vs following modes. This will be easier to handle when we have celestia-node subscriptions to a namespace. Currently the code difference between sync and follow isnt that far off. Id hold off on the seperation until we have celestia-node API integrated. I do tend to prefer the event drven design in 2798, while i understand this is effectively what goheader is also doing under the hood when we use getbyheight the event driven approach uses less resources as we dont need to have long lasting requests. This is a microoptimization at this point i understand that as well. Thanks for the PR, We will come back to it in the future, but for now will close it |
@tac0turtle, can you clarify which resources the Get call would use that the new event system would not? For now, the justification for building another event system layer that duplicates existing functionality seems stretched out. The simple get call is both simpler and easier with less code and complexity. While we are not code owners in this repo, it would still be great to have a benchmark that justifies another event system layer to reject Rene's proposal. Also, from go-header design perspective, it would be great to know why this event system is useful, so that we could potentially incorporate it. The goal of this library is to minimize the amount of code users need to write and solve problems for them instead of creating more. |
|
The reason I didn't do firing getHeights is because it was more complex to have a system that does an indefinite getheight call, then needs watcher to see if the height came in from somewhere else to cancel the call. I did that in one of the commits, the implementation was more complex because we were spinning up many getheight calls and watchers. Doing one is less complex but still requires a watcher to cancel the contexts when the height comes in from a different source. The design you guys have makes sense when p2p is the single source of syncing, but indefinite calls in our case need to be tracked and cancelled if the block comes in from elsewhere. I still need to do further testing to verify the flow so it may be that i need to go back to a polling and watcher design. |
Potential alternative to #2798
daRetrievalLoop--> fires job to fetch from DA every Celestia blocktime (6s)waitForNewP2PHeights--> fires job every ev-node blocktime (100ms) to process headers between headerStore.Height() --> headerSyncHead.Height(), and if daStore.Height() somehow behind daSyncHead.Height(), processes data between that range tooProcessHeaderRangeandProcessDataRange, pass syncer's context in there instead of artificially lowering timeout on ctx as go-header header store implicitly subscribes to the desired height and returns it when it's ready. This means, when the ev-node sync process expects a new height to be available, it will sit there + wait til it is returned (as once it lands in header store, it has gone through all forms of verification, etc).